{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%matplotlib inline"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "\n",
    "Large-Scale Training of Graph Neural Networks\n",
    "=============================================\n",
    "\n",
    "**Author**: Da Zheng, Chao Ma, Zheng Zhang\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In real-world tasks, many graphs are very large. For example, a recent\n",
    "snapshot of the friendship network of Facebook contains 800 million\n",
    "nodes and over 100 billion links. We are facing challenges on\n",
    "large-scale training of graph neural networks.\n",
    "\n",
    "To accelerate training on a giant graph, DGL provides two additional\n",
    "components: sampler and graph store.\n",
    "\n",
    "-  A sampler constructs small subgraphs (``NodeFlow``) from a given\n",
    "   (giant) graph. The sampler can run on a local machine as well as on\n",
    "   remote machines. Also, DGL can launch multiple parallel samplers\n",
    "   across a set of machines.\n",
    "\n",
    "-  The graph store contains graph embeddings of a giant graph, as well\n",
    "   as the graph structure. So far, we provide a shared-memory graph\n",
    "   store to support multi-processing training, which is important for\n",
    "   training on multiple GPUs and on non-uniform memory access (NUMA)\n",
    "   machines. The shared-memory graph store has a similar interface to\n",
    "   ``DGLGraph`` for programming. DGL will also support a distributed\n",
    "   graph store that can store graph embeddings across machines in the\n",
    "   future release.\n",
    "\n",
    "The figure below shows the interaction of the trainer with the samplers\n",
    "and the graph store. The trainer takes subgraphs (``NodeFlow``) from the\n",
    "sampler and fetches graph embeddings from the graph store before\n",
    "training. The trainer can push new graph embeddings to the graph store\n",
    "afterward.\n",
    "\n",
    "|image0|\n",
    "\n",
    "In this tutorial, we use control-variate sampling to demonstrate how to\n",
    "use these three DGL components, extending `the original code of\n",
    "control-variate\n",
    "sampling <https://doc.dgl.ai/tutorials/models/5_giant_graph/1_sampling_mx.html#sphx-glr-tutorials-models-5-giant-graph-1-sampling-mx-py>`__.\n",
    "Because the graph store has a similar API to ``DGLGraph``, the code is\n",
    "similar. The tutorial will mainly focus on the difference.\n",
    "\n",
    "Graph Store\n",
    "-----------\n",
    "\n",
    "The graph store has two parts: the server and the client. We need to run\n",
    "the graph store server as a daemon before training. We provide a script\n",
    "``run_store_server.py`` `(link) <https://github.com/dmlc/dgl/blob/master/examples/mxnet/sampling/run_store_server.py>`__\n",
    "that runs the graph store server and loads graph data. For example, the\n",
    "following command runs a graph store server that loads the reddit\n",
    "dataset and is configured to run with four trainers.\n",
    "\n",
    "::\n",
    "\n",
    "   python3 run_store_server.py --dataset reddit --num-workers 4\n",
    "\n",
    "The trainer uses the graph store client to access data in the graph\n",
    "store from the trainer process. A user only needs to write code in the\n",
    "trainer. We first create the graph store client that connects with the\n",
    "server. We specify ``store_type`` as “shared_memory” to connect with the\n",
    "shared-memory graph store server.\n",
    "\n",
    ".. code:: python\n",
    "\n",
    "   g = dgl.contrib.graph_store.create_graph_from_store(\"reddit\", store_type=\"shared_mem\")\n",
    "\n",
    "The `sampling\n",
    "tutorial <https://doc.dgl.ai/tutorials/models/5_giant_graph/1_sampling_mx.html#sphx-glr-tutorials-models-5-giant-graph-1-sampling-mx-py>`__\n",
    "shows the detail of sampling methods and how they are used to train\n",
    "graph neural networks such as graph convolution network. As a recap, the\n",
    "graph convolution model performs the following computation in each\n",
    "layer.\n",
    "\n",
    "\\begin{align}z_v^{(l+1)} = \\sum_{u \\in \\mathcal{N}^{(l)}(v)} \\tilde{A}_{uv} h_u^{(l)} \\qquad\n",
    "   h_v^{(l+1)} = \\sigma ( z_v^{(l+1)} W^{(l)} )\\end{align}\n",
    "\n",
    "`Control variate sampling <https://arxiv.org/abs/1710.10568>`__\n",
    "approximates $z_v^{(l+1)}$ as follows:\n",
    "\n",
    "\\begin{align}\\hat{z}_v^{(l+1)} = \\frac{\\vert \\mathcal{N}(v) \\vert }{\\vert \\hat{\\mathcal{N}}^{(l)}(v) \\vert} \\sum_{u \\in \\hat{\\mathcal{N}}^{(l)}(v)} \\tilde{A}_{uv} ( \\hat{h}_u^{(l)} - \\bar{h}_u^{(l)} ) + \\sum_{u \\in \\mathcal{N}(v)} \\tilde{A}_{uv} \\bar{h}_u^{(l)} \\\\\n",
    "   \\hat{h}_v^{(l+1)} = \\sigma ( \\hat{z}_v^{(l+1)} W^{(l)} )\\end{align}\n",
    "\n",
    "In addition to the approximation, `Chen et.\n",
    "al. <https://arxiv.org/abs/1710.10568>`__ applies a preprocessing trick\n",
    "to reduce the number of hops for sampling neighbors by one. This trick\n",
    "works for models such as Graph Convolution Networks and GraphSage. It\n",
    "preprocesses the input layer. The original GCN takes $X$ as input.\n",
    "Instead of taking $X$ as the input of the model, the trick\n",
    "computes $U^{(0)}=\\tilde{A}X$ and uses $U^{(0)}$ as the\n",
    "input of the first layer. In this way, the vertices in the first layer\n",
    "does not need to compute aggregation over their neighborhood and, thus,\n",
    "reduce the number of layers to sample by one.\n",
    "\n",
    "For a giant graph, both $\\tilde{A}$ and $X$ can be very\n",
    "large. We need to perform this operation in a distributed fashion. That\n",
    "is, each trainer takes part of the computation and the computation is\n",
    "distributed among all trainers. We can use ``update_all`` in the graph\n",
    "store to perform this computation.\n",
    "\n",
    ".. code:: python\n",
    "\n",
    "   g.update_all(fn.copy_src(src='features', out='m'),\n",
    "                fn.sum(msg='m', out='preprocess'),\n",
    "                lambda node : {'preprocess': node.data['preprocess'] * node.data['norm']})\n",
    "\n",
    "``update_all`` in the graph store runs in a distributed fashion. That\n",
    "is, all trainers need to invoke this function and take part of the\n",
    "computation. When a trainer completes its portion, it will wait for\n",
    "other trainers to complete before proceeding with its other computation.\n",
    "\n",
    "The node/edge data now live in the graph store and the access to the\n",
    "node/edge data is now a little different. The graph store no longer\n",
    "supports data access with ``g.ndata``/``g.edata``, which reads the\n",
    "entire node/edge data tensor. Instead, users have to use\n",
    "``g.nodes[node_ids].data[embed_name]`` to access data on some nodes.\n",
    "(Note: this method is also allowed in ``DGLGraph`` and ``g.ndata`` is\n",
    "simply a short syntax for ``g.nodes[:].data``). In addition, the graph\n",
    "store supports ``get_n_repr``/``set_n_repr`` for node data and\n",
    "``get_e_repr``/``set_e_repr`` for edge data.\n",
    "\n",
    "To initialize the node/edge tensors more efficiently, we provide two new\n",
    "methods in the graph store client to initialize node data and edge data\n",
    "(i.e., ``init_ndata`` for node data or ``init_edata`` for edge data).\n",
    "What happened under the hood is that these two methods send\n",
    "initialization commands to the server and the graph store server\n",
    "initializes the node/edge tensors on behalf of trainers.\n",
    "\n",
    "Here we show how we should initialize node data for control-variate\n",
    "sampling. ``h_i`` stores the history of nodes in layer ``i``;\n",
    "``agg_h_i`` stores the aggregation of the history of neighbor nodes in\n",
    "layer ``i``.\n",
    "\n",
    ".. code:: python\n",
    "\n",
    "   for i in range(n_layers):\n",
    "       g.init_ndata('h_{}'.format(i), (features.shape[0], args.n_hidden), 'float32')\n",
    "       g.init_ndata('agg_h_{}'.format(i), (features.shape[0], args.n_hidden), 'float32')\n",
    "\n",
    "After we initialize node data, we train GCN with control-variate\n",
    "sampling as below. The training code takes advantage of preprocessed\n",
    "input data in the first layer and works identically to the\n",
    "single-process training procedure.\n",
    "\n",
    ".. code:: python\n",
    "\n",
    "   for nf in NeighborSampler(g, batch_size, num_neighbors,\n",
    "                             neighbor_type='in', num_hops=L-1,\n",
    "                             seed_nodes=labeled_nodes):\n",
    "       for i in range(nf.num_blocks):\n",
    "           # aggregate history on the original graph\n",
    "           g.pull(nf.layer_parent_nid(i+1),\n",
    "                  fn.copy_src(src='h_{}'.format(i), out='m'),\n",
    "                  lambda node: {'agg_h_{}'.format(i): node.data['m'].mean(axis=1)})\n",
    "       # We need to copy data in the NodeFlow to the right context.\n",
    "       nf.copy_from_parent(ctx=right_context)\n",
    "       nf.apply_layer(0, lambda node : {'h' : layer(node.data['preprocess'])})\n",
    "       h = nf.layers[0].data['h']\n",
    "\n",
    "       for i in range(nf.num_blocks):\n",
    "           prev_h = nf.layers[i].data['h_{}'.format(i)]\n",
    "           # compute delta_h, the difference of the current activation and the history\n",
    "           nf.layers[i].data['delta_h'] = h - prev_h\n",
    "           # refresh the old history\n",
    "           nf.layers[i].data['h_{}'.format(i)] = h.detach()\n",
    "           # aggregate the delta_h\n",
    "           nf.block_compute(i,\n",
    "                            fn.copy_src(src='delta_h', out='m'),\n",
    "                            lambda node: {'delta_h': node.data['m'].mean(axis=1)})\n",
    "           delta_h = nf.layers[i + 1].data['delta_h']\n",
    "           agg_h = nf.layers[i + 1].data['agg_h_{}'.format(i)]\n",
    "           # control variate estimator\n",
    "           nf.layers[i + 1].data['h'] = delta_h + agg_h\n",
    "           nf.apply_layer(i + 1, lambda node : {'h' : layer(node.data['h'])})\n",
    "           h = nf.layers[i + 1].data['h']\n",
    "       # update history\n",
    "       nf.copy_to_parent()\n",
    "\n",
    "The complete example code can be found\n",
    "`here <https://github.com/dmlc/dgl/tree/master/examples/mxnet/sampling>`__.\n",
    "\n",
    "After showing how the shared-memory graph store is used with\n",
    "control-variate sampling, let’s see how to use it for multi-GPU training\n",
    "and how to optimize the training on a non-uniform memory access (NUMA)\n",
    "machine. A NUMA machine here means a machine with multiple processors\n",
    "and large memory. It works for all backend frameworks as long as the\n",
    "framework supports multi-processing training. If we use MXNet as the\n",
    "backend, we can use the distributed MXNet kvstore to aggregate gradients\n",
    "among processes and use the MXNet launch tool to launch multiple workers\n",
    "that run the training script. The command below launches our example\n",
    "code for multi-processing GCN training with control variate sampling and\n",
    "it runs 4 trainers.\n",
    "\n",
    "::\n",
    "\n",
    "   python3 ../incubator-mxnet/tools/launch.py -n 4 -s 1 --launcher local \\\n",
    "       python3 examples/mxnet/sampling/multi_process_train.py \\\n",
    "       --graph-name reddit \\\n",
    "       --model gcn_cv --num-neighbors 1 \\\n",
    "       --batch-size 2500 --test-batch-size 5000 \\\n",
    "       --n-hidden 64\n",
    "\n",
    "..\n",
    "\n",
    "It is fairly easy to enable multi-GPU training. All we need to do is to\n",
    "copy data to a right GPU context and invoke NodeFlow computation in that\n",
    "GPU context. As shown above, we specify a context ``right_context`` in\n",
    "``copy_from_parent``.\n",
    "\n",
    "To optimize the computation on a NUMA machine, we need to configure each\n",
    "process properly. For example, we should use the same number of\n",
    "processes as the number of NUMA nodes (usually equivalent to the number\n",
    "of processors) and bind the processes to NUMA nodes. In addition, we\n",
    "should reduce the number of OpenMP threads to the number of CPU cores in\n",
    "a processor and reduce the number of threads of the MXNet kvstore to a\n",
    "small number such as 4.\n",
    "\n",
    ".. code:: python\n",
    "\n",
    "   import numa\n",
    "   import os\n",
    "   if 'DMLC_TASK_ID' in os.environ and int(os.environ['DMLC_TASK_ID']) < 4:\n",
    "       # bind the process to a NUMA node.\n",
    "       numa.bind([int(os.environ['DMLC_TASK_ID'])])\n",
    "       # Reduce the number of OpenMP threads to match the number of\n",
    "       # CPU cores of a processor.\n",
    "       os.environ['OMP_NUM_THREADS'] = '16'\n",
    "   else:\n",
    "       # Reduce the number of OpenMP threads in the MXNet KVstore server to 4.\n",
    "       os.environ['OMP_NUM_THREADS'] = '4'\n",
    "\n",
    "Given the configuration above, NUMA-aware multi-processing training can\n",
    "accelerate training almost by a factor of 4 as shown in the figure below\n",
    "on an X1.32xlarge instance where there are 4 processors, each of which\n",
    "has 16 physical CPU cores. We can see that NUMA-unaware training cannot\n",
    "take advantage of computation power of the machine. It is even slightly\n",
    "slower than just using one of the processors in the machine. NUMA-aware\n",
    "training, on the other hand, takes about only 20 seconds to converge to\n",
    "the accuracy of 96% with 20 iterations.\n",
    "\n",
    "|image1|\n",
    "\n",
    "Distributed Sampler\n",
    "-------------------\n",
    "\n",
    "For many tasks, we found that the sampling takes a significant amount of\n",
    "time for the training process on a giant graph. So DGL supports\n",
    "distributed samplers for speeding up the sampling process on giant\n",
    "graphs. DGL allows users to launch multiple samplers on different\n",
    "machines concurrently, and each sampler can send its sampled subgraph\n",
    "(``NodeFlow``) to trainer machines continuously.\n",
    "\n",
    "To use the distributed sampler on DGL, users start both trainer and\n",
    "sampler processes on different machines. Users can find the complete\n",
    "demo code and launch scripts `in this\n",
    "link <https://github.com/dmlc/dgl/tree/master/examples/mxnet/sampling/dis_sampling>`__\n",
    "and this tutorial will focus on the main difference between\n",
    "single-machine code and distributed code.\n",
    "\n",
    "For the trainer, developers can easily migrate the existing\n",
    "single-machine sampler code to the distributed setting seamlessly by\n",
    "just changing a few lines of code. First, users need to create a\n",
    "distributed ``SamplerReceiver`` object before training:\n",
    "\n",
    ".. code:: python\n",
    "\n",
    "   sampler = dgl.contrib.sampling.SamplerReceiver(graph, ip_addr, num_sampler)\n",
    "\n",
    "The ``SamplerReceiver`` class is used for receiving remote subgraph from\n",
    "other machines. This API has three arguments: ``parent_graph``,\n",
    "``ip_address``, and ``number_of_samplers``.\n",
    "\n",
    "After that, developers can change just one line of existing\n",
    "single-machine training code like this:\n",
    "\n",
    ".. code:: python\n",
    "\n",
    "   for nf in sampler:\n",
    "       for i in range(nf.num_blocks):\n",
    "           # aggregate history on the original graph\n",
    "           g.pull(nf.layer_parent_nid(i+1),\n",
    "                  fn.copy_src(src='h_{}'.format(i), out='m'),\n",
    "                  lambda node: {'agg_h_{}'.format(i): node.data['m'].mean(axis=1)})\n",
    "\n",
    "   ...\n",
    "\n",
    "Here, we use the code ``for nf in sampler`` to replace the original\n",
    "single-machine sampling code:\n",
    "\n",
    ".. code:: python\n",
    "\n",
    "   for nf in NeighborSampler(g, batch_size, num_neighbors,\n",
    "                             neighbor_type='in', num_hops=L-1,\n",
    "                             seed_nodes=labeled_nodes):\n",
    "\n",
    "All the other parts of the original single-machine code is not changed.\n",
    "\n",
    "In addition, developers need to write sampling logic on the sampler\n",
    "machine. For neighbor-sampler, developers can just copy their existing\n",
    "single-machine code to sampler machines like this:\n",
    "\n",
    ".. code:: python\n",
    "\n",
    "   sender = dgl.contrib.sampling.SamplerSender(trainer_address)\n",
    "\n",
    "   ...\n",
    "\n",
    "   for n in num_epoch:\n",
    "       for nf in dgl.contrib.sampling.NeighborSampler(graph, batch_size, num_neighbors,\n",
    "                                                          neighbor_type='in',\n",
    "                                                          shuffle=shuffle,\n",
    "                                                          num_workers=num_workers,\n",
    "                                                          num_hops=num_hops,\n",
    "                                                          add_self_loop=add_self_loop,\n",
    "                                                          seed_nodes=seed_nodes):\n",
    "           sender.send(nf, trainer_id)\n",
    "       # tell trainer I have finished current epoch\n",
    "       sender.signal(trainer_id)\n",
    "\n",
    "The figure below shows the overall performance improvement of training\n",
    "GCN and GraphSage on the Reddit dataset after deploying the\n",
    "optimizations in this tutorial. Our NUMA optimization speeds up the\n",
    "training by a factor of 4. The distributed sampling achieves additional\n",
    "20%-40% speed improvement for different tasks.\n",
    "\n",
    "|image2|\n",
    "\n",
    "Scale to giant graphs\n",
    "---------------------\n",
    "\n",
    "Finally, we would like to demonstrate the scalability of DGL with giant\n",
    "synthetic graphs. We create three large power-law graphs with\n",
    "`RMAT <http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf>`__. Each\n",
    "node is associated with 100 features and we compute node embeddings with\n",
    "64 dimensions. Below shows the training speed and memory consumption of\n",
    "GCN with neighbor sampling.\n",
    "\n",
    "====== ====== ================== ===========\n",
    "#Nodes #Edges Time per epoch (s) Memory (GB)\n",
    "====== ====== ================== ===========\n",
    "5M     250M   4.7                8\n",
    "50M    2.5B   46                 75\n",
    "500M   25B    505                740\n",
    "====== ====== ================== ===========\n",
    "\n",
    "We can see that DGL can scale to graphs with up to 500M nodes and 25B\n",
    "edges.\n",
    "\n",
    ".. |image0| image:: https://s3.us-east-2.amazonaws.com/dgl.ai/tutorial/sampling/arch.png\n",
    ".. |image1| image:: https://s3.us-east-2.amazonaws.com/dgl.ai/tutorial/sampling/NUMA_speedup.png\n",
    ".. |image2| image:: https://s3.us-east-2.amazonaws.com/dgl.ai/tutorial/sampling/whole_speedup.png\n",
    "\n",
    "\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
